-
Notifications
You must be signed in to change notification settings - Fork 70
feat: Add singleton record fetch and MCP tool for declarative sources #872
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Add singleton record fetch and MCP tool for declarative sources #872
Conversation
Implements Source.get_record() and DeclarativeExecutor.fetch_record() methods to enable fetching single records by primary key value from declarative sources. Key features: - Source.get_record(stream_name, pk_value) - Public API for fetching records - DeclarativeExecutor.fetch_record() - Internal implementation using CDK components - Primary key validation and normalization (supports string, int, dict formats) - Composite primary key detection (raises NotImplementedError) - New AirbyteRecordNotFoundError exception for missing records - Comprehensive unit tests with proper mocking This implementation reuses existing CDK components (SimpleRetriever, HttpClient, RecordSelector) without monkey-patching or pinning CDK versions, providing a hybrid approach that works with the current CDK release. Related to CDK PR airbytehq/airbyte-python-cdk#846 Co-Authored-By: AJ Steers <[email protected]>
Original prompt from AJ Steers |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1763667280-add-get-record-without-cdk-dependency' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1763667280-add-get-record-without-cdk-dependency'Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
Community SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
Co-Authored-By: AJ Steers <[email protected]>
📝 WalkthroughWalkthroughAdds single-record retrieval by primary key: new DeclarativeExecutor.fetch_record, new AirbyteRecordNotFoundError exception, Source helpers and get_record with declarative delegation and optional scanning fallback, a CLI/local helper, and unit tests covering these behaviors. Changes
Sequence DiagramsequenceDiagram
participant User as User
participant Source as Source
participant Exec as DeclarativeExecutor
participant Retriever as SimpleRetriever
participant HTTP as HTTPRequester
User->>Source: get_record(stream_name, pk_value)
activate Source
Source->>Source: catalog_provider / normalize PK
Source->>Exec: fetch_record(stream_name, primary_key_value)
activate Exec
Exec->>Exec: resolve & unwrap stream
Exec->>Retriever: ensure SimpleRetriever & get_path(primary_key)
Exec->>HTTP: requester.request(method, path, headers, params, body, slice=StreamSlice.empty)
activate HTTP
HTTP-->>Exec: response
deactivate HTTP
alt no response or no records
Exec-->>Source: AirbyteRecordNotFoundError
else records found
Exec->>Exec: extract via record_selector or JSON-dict fallback
Exec-->>Source: first record (dict)
end
deactivate Exec
alt Exec returned record
Source-->>User: record
else error
Source-->>User: error
end
deactivate Source
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Would you like an additional targeted unit test for the JSON-dict fallback case, wdyt? Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (7)
airbyte/exceptions.py (1)
415-421: Optional: includeprimary_key_valuein safe logging context?
AirbyteRecordNotFoundErrorlooks consistent with the surrounding connector errors. Would it be useful to also surfaceprimary_key_valuefrom this exception inPyAirbyteError.safe_logging_dict(similar tostream_name), so logs have enough context when a lookup fails, assuming the PK isn’t considered sensitive in your threat model, wdyt?tests/unit_tests/test_get_record.py (2)
24-57: Stabilize expectations aroundfetch_record“happy path”?In
test_declarative_executor_fetch_record_stream_validation, the “valid_stream_and_pk” case currently asserts thatfetch_recordraises one of(NotImplementedError, AttributeError, KeyError)(Line [56]), i.e. any internal failure after stream lookup. As the CDK integration evolves andModelToComponentFactorywiring stabilizes, this path may eventually succeed, which would flip this test from “green” to “red” even though behavior improved.Would it be more future‑proof to explicitly assert only the stream‑existence contract here—for example by mocking out
ModelToComponentFactory.create_componentandSimpleRetrieverso that the test simply verifies that noAirbyteStreamNotFoundErroris raised for an existing stream, and leaves the downstream behavior to more targeted tests, wdyt?
144-149: Usetyping.Anyinstead of bareanyin type hints?Both
test_source_normalize_and_validate_pk_valueandtest_source_get_record_accepts_various_pk_formatsannotatepk_valueasany. Withfrom __future__ import annotations, that ends up as the string"any", which type checkers typically won’t recognize asAny.Would you consider importing
Anyfromtypingand updating these annotations topk_value: Anyfor clarity and better static‑analysis support, wdyt?Also applies to: 217-217
airbyte/_executors/declarative.py (1)
18-25: Double‑check CDK integration and edge cases infetch_recordThe high‑level flow in
fetch_recordlooks aligned with the intended design, but a few CDK integration details and edge cases might be worth validating before relying on this in production:
Factory usage and manifest shape (Lines [192]-[207])
Here you callModelToComponentFactory.create_componentwithmodel_type=type(retriever_config)whereretriever_configis pulled directly from the manifest dict. My understanding is that CDK factories typically expect a Pydantic model instance (or specific declarative model types) ascomponent_definition, not a raw dict. Depending on the exact CDK version/contract, this could either work by accident or fail at runtime and surface as theNotImplementedErroryou wrap around it.
Would it make sense to (a) confirm whatcreate_componentexpects in the current CDK, and/or (b) add a small integration test using a real declarative manifest to ensure this wiring actually produces aSimpleRetriever, wdyt?Schema/record selector wiring (Lines [262]-[273])
records_schemais currently derived fromstream_config.get("schema_loader", {}), but in many declarative manifestsschema_loaderis a configuration for a loader rather than the resolved JSON schema. Are you surerecord_selector.select_recordsexpects the loader config here, rather than the actual JSON schema (or evenNone), and if not, should we either:
- resolve the schema via the loader, or
- explicitly pass
{}and rely on selectors that don’t need schema,
wdyt?Empty
recordsbut non‑dict JSON responses (Lines [275]-[282])
Whenrecordsis empty, you only special‑case non‑empty dict JSON bodies. If a connector returns[{"id": "123", ...}](a single‑element list) for a detail endpoint, this will currently raiseAirbyteRecordNotFoundErroreven though the response is there. Would you consider handling the “single‑element list” case by returning that single element (and perhaps logging a warning), or is it intentional to only support dict‑shaped bodies for now, wdyt?HTTP status / 404 handling
Since you callsend_requestdirectly, a 404 might be represented as either:
- an exception from the requester, or
- a normal response object with status 404 but no body/records.
If it’s the latter, would you want to explicitly checkresponse.status_code(when available) and map a 404 toAirbyteRecordNotFoundErrorfor clearer semantics, wdyt?Path construction & URL encoding (Line [225])
fetch_path = f"{base_path}/{primary_key_value}".lstrip("/")works well for numeric/simple IDs, but if a primary key ever contains reserved URL characters (e.g., spaces, slashes), the request path could be malformed. Would it be safer toquotetheprimary_key_value(e.g., viaurllib.parse.quote) before appending, or at least leave a comment/TODO here to revisit once you see real‑world connectors, wdyt?Given your PR description already calls out that this isn’t yet exercised against real connectors, adding a small note or follow‑up task around these points could help ensure the API behaves as expected once wired into actual YAML manifests, wdyt?
Also applies to: 150-297
airbyte/sources/base.py (3)
605-621: Clarify interaction between primary‑key overrides and nested PK shapes
_get_stream_primary_keynicely flattens the “nested list” Airbyte PK structure (e.g.,[[\"id\"], [\"org_id\"]]→["id", "org_id"]). One subtle corner case is how this interacts with overrides coming fromset_primary_key:
set_primary_key("stream", ["id", "org_id"])stores["id", "org_id"]in_primary_key_overrides.get_configured_catalogthen wraps that as[self._primary_key_overrides[...]], yieldingprimary_key=[[\"id\", \"org_id\"]]for the configured stream._get_stream_primary_keyseespk[0]as a list and flattens via[field[0] if isinstance(field, list) else field for field in pk], which would return only"id"in this specific override shape.Would it be worth either:
- clarifying in docs that
set_primary_keyshould be used only for single‑field PKs for now, or- adjusting the flattening logic/tests to cover the override case so composite PK overrides don’t get silently truncated, wdyt?
628-675: Tighten validation ofpk_value(e.g.,None) before coercing to string?
_normalize_and_validate_pk_valuedoes a good job of enforcing single‑field PKs and validating dict input against the expected field. Right now, any non‑dict value—includingNoneor empty strings—is accepted and coerced viastr(pk_value)(Line [675]), which could lead to requests like/users/Noneor/users/if a caller accidentally passes a falsy/placeholder value.Would you consider:
- explicitly rejecting
pk_value is None(and perhapspk_value == "") with aPyAirbyteInputError, and- maybe including the stream name and PK field in the error message to aid debugging,
so consumers get a clear input‑validation failure instead of a surprising HTTP path, wdyt?
677-714: Confirmget_recordis the only entrypoint that should call_normalize_and_validate_pk_value
get_recordis currently the single public entrypoint using_normalize_and_validate_pk_value, and it correctly enforcesDeclarativeExecutorbefore delegating. Given that_normalize_and_validate_pk_valueis fairly generic, do you want to keep it private and only reachable viaget_record, or would you prefer to document it (or expose a wrapper) for advanced users building their own request logic on top ofSource?If you intend it to remain strictly internal, maybe a brief comment above
_normalize_and_validate_pk_valuenoting that it’s “used only byget_recordfor now” could help future maintainers avoid calling it in incompatible contexts, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte/_executors/declarative.py(2 hunks)airbyte/exceptions.py(1 hunks)airbyte/sources/base.py(2 hunks)tests/unit_tests/test_get_record.py(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: Suraj-Vishwakarma70
Repo: airbytehq/PyAirbyte PR: 285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.
Applied to files:
airbyte/sources/base.py
🧬 Code graph analysis (4)
airbyte/exceptions.py (2)
airbyte/datasets/_sql.py (1)
stream_name(86-87)airbyte/_batch_handles.py (1)
stream_name(47-49)
airbyte/_executors/declarative.py (1)
airbyte/exceptions.py (2)
AirbyteStreamNotFoundError(400-404)AirbyteRecordNotFoundError(416-420)
airbyte/sources/base.py (3)
airbyte/_executors/declarative.py (2)
DeclarativeExecutor(47-297)fetch_record(150-297)airbyte/shared/catalog_providers.py (1)
configured_catalog(72-74)airbyte/exceptions.py (2)
AirbyteStreamNotFoundError(400-404)PyAirbyteInputError(201-210)
tests/unit_tests/test_get_record.py (3)
airbyte/_executors/declarative.py (2)
DeclarativeExecutor(47-297)fetch_record(150-297)airbyte/sources/base.py (3)
_get_stream_primary_key(605-626)_normalize_and_validate_pk_value(628-675)get_record(677-714)airbyte/exceptions.py (2)
AirbyteStreamNotFoundError(400-404)PyAirbyteInputError(201-210)
🪛 GitHub Actions: Run Linters
tests/unit_tests/test_get_record.py
[error] 1-1: Ruff formatting failed: 1 file would be reformatted. Exit code 1 from 'poetry run ruff format --diff .'
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
PyTest Results (Full)430 tests +41 414 ✅ +41 25m 39s ⏱️ - 1m 27s Results for commit ba85aaf. ± Comparison against base commit 2981b3d. This pull request removes 1 and adds 42 tests. Note that renamed tests count towards both.♻️ This comment has been updated with latest results. |
…ch_record() - Remove ModelToComponentFactory usage in favor of accessing existing streams - Add _unwrap_to_declarative_stream() helper to navigate concurrent wrappers - Update fetch_record() to call declarative_source.streams() for existing components - Fix unit tests to mock declarative_source property correctly - Add type ignore comments for duck-typed attribute access Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte/_executors/declarative.py (1)
325-342: Tighten the JSON fallback and replace the bareexcept Exception: passThe
try/except Exception: passaroundresponse.json()on Lines [327]-[331] both repeats the earlier “empty except” concern and makes it harder to notice unexpected decode failures. Would you consider:
- Switching to
with contextlib.suppress(Exception):around theresponse.json()call (as you already do forschema_loader.get_json_schema()), so the intent to ignore parse errors is explicit without an emptyexcept; and- Optionally also handling the case where
response.json()returns a non-empty list (e.g. a one-element[{...}]) by treating a single dict element as the record, to cover APIs that return arrays even for singleton lookups?That would preserve the current not-found semantics while addressing the lint concern and slightly widening compatibility, wdyt?
- if not records: - try: - response_json = response.json() - if isinstance(response_json, dict) and response_json: - return response_json - except Exception: - pass + if not records: + with contextlib.suppress(Exception): + response_json = response.json() + if isinstance(response_json, dict) and response_json: + return response_json + if ( + isinstance(response_json, list) + and len(response_json) == 1 + and isinstance(response_json[0], dict) + ): + return response_json[0]
🧹 Nitpick comments (1)
airbyte/_executors/declarative.py (1)
45-89: Consider iterating through wrapper chains in_unwrap_to_declarative_streamRight now the helper only peeks one level through the known wrapper attributes and branch attrs (Lines [64]-[83]), so a future wrapper-of-wrapper that still ultimately exposes a
retrievervia the same attributes would end up raisingNotImplementedErroreven though it could be unwrapped. Would you consider turning this into an iterative loop (tracking visited objects and repeatedly followingwrapper_attrs/branch_attruntil aretrieveris found or a depth limit is reached) to make it more robust to additional wrapper layers while keeping the same error behavior when noretrieverexists, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte/_executors/declarative.py(4 hunks)tests/unit_tests/test_get_record.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/unit_tests/test_get_record.py
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/_executors/declarative.py (1)
airbyte/exceptions.py (2)
AirbyteStreamNotFoundError(400-404)AirbyteRecordNotFoundError(416-420)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (No Creds)
- Remove config parameter from DeclarativeExecutor.fetch_record() - Remove config argument from Source.get_record() call to fetch_record() - Executor already has full config in self._config_dict, no need to pass it Co-Authored-By: AJ Steers <[email protected]>
…rameter The test was expecting the config parameter that was removed in the previous commit. Updated the assertion to match the new signature. Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (1)
airbyte/_executors/declarative.py (1)
321-327: Address the silent exception suppression with logging.The empty
exceptblock at lines 326-327 silently suppresses exceptions when attempting to parseresponse.json(). This was flagged in previous review comments as needing an explanatory comment or logging.While the current behavior (try to parse JSON, fall through to error if it fails) is reasonable, adding logging would help with debugging when unexpected responses occur.
Would you consider adding logging here as suggested in the previous review? For example:
+import logging + +logger = logging.getLogger(__name__) # ... in fetch_record method ... if not records: try: response_json = response.json() if isinstance(response_json, dict) and response_json: return response_json - except Exception: - pass + except Exception as e: + logger.debug( + "Failed to parse response as JSON when fetching record for stream '%s', " + "primary key '%s': %s", + stream_name, + primary_key_value, + str(e), + )This provides visibility into parsing failures while maintaining the existing fallthrough behavior.
🧹 Nitpick comments (5)
airbyte/sources/base.py (2)
605-626: Consider adding type validation for primary key elements, wdyt?At line 620, when
pkis a flat list (not nested), we cast it tolist[str]with a type ignore comment. However, there's no explicit validation that the elements are actually strings. If the catalog contains unexpected types, this could propagate invalid data downstream.Would it be safer to explicitly validate and convert elements to strings? For example:
if isinstance(pk, list) and len(pk) > 0: if isinstance(pk[0], list): return [field[0] if isinstance(field, list) else field for field in pk] - return list(pk) # type: ignore[arg-type] + return [str(field) for field in pk]This would ensure type safety and make the conversion explicit. Alternatively, if we trust the catalog format completely, a comment explaining why the type ignore is safe would help future maintainers.
628-675: Consider validating against None and empty primary key values, wdyt?The method converts
pk_valueto a string without checking forNoneor empty values. If a user accidentally passesNone, it would be converted to the string"None", which is probably not the intended primary key value. Similarly, empty strings or whitespace-only strings might warrant validation.Would it make sense to add validation like this after line 675?
return str(pk_value[provided_key]) return str(pk_value) + +def _normalize_and_validate_pk_value( + self, + stream_name: str, + pk_value: Any, # noqa: ANN401 +) -> str: + """Normalize and validate a primary key value.""" + # ... existing logic ... + + result = str(pk_value) # or the dict extraction above + + if not result or result == "None": + raise exc.PyAirbyteInputError( + message=f"Primary key value cannot be None or empty for stream '{stream_name}'.", + input_value=str(pk_value), + ) + + return resultThis would catch common user errors early with a clear message.
airbyte/_executors/declarative.py (3)
45-89: Consider validating that retriever is not None, wdyt?The unwrapping logic checks
hasattr(unwrapped, "retriever")at lines 76 and 82, but doesn't verify that the retriever attribute itself is notNone. If a stream wrapper has aretrieverattribute set toNone, this function would return the wrapper, and the caller at line 249 would attempt to access it without a null check.Would it be safer to add a not-None check? For example:
if hasattr(stream, attr_name): unwrapped = getattr(stream, attr_name) - if unwrapped is not None and hasattr(unwrapped, "retriever"): + if unwrapped is not None and hasattr(unwrapped, "retriever") and unwrapped.retriever is not None: return unwrappedSimilarly for the branch attributes at lines 81-83. This would make the function more robust against unexpected CDK states.
257-267: Consider adding validation for path construction edge cases, wdyt?The path construction at lines 264-267 assumes that
base_pathis well-formed and simply appends the primary key value. However, there are potential edge cases:
- What if
base_pathends with multiple slashes (e.g.,"/users//")?- What if
base_pathalready contains a path segment that looks like a placeholder?- What if
primary_key_valuecontains URL-unsafe characters?Would it be worth adding some defensive checks or URL encoding? For example:
from urllib.parse import quote # After line 262 if base_path: # Normalize the base path and encode the PK value normalized_base = base_path.rstrip('/') encoded_pk = quote(str(primary_key_value), safe='') fetch_path = f"{normalized_base}/{encoded_pk}" else: fetch_path = quote(str(primary_key_value), safe='')This would handle special characters in PK values (like spaces, slashes, etc.) more robustly. Though if this isn't a concern for the initial implementation, it could be deferred.
195-343: Consider breaking down fetch_record into smaller helper methods, wdyt?The
fetch_recordmethod is quite long (149 lines) and handles multiple responsibilities: stream lookup, retriever validation, path construction, HTTP request, schema retrieval, record selection, and response parsing. This complexity is reflected in the linter suppressions (PLR0914, PLR0912, PLR0915).While the current implementation is functional, would it be beneficial to extract some of these concerns into smaller, testable helper methods? For example:
def _find_stream(self, stream_name: str, streams: list) -> object: """Find and return the target stream by name.""" # Lines 220-245 def _build_fetch_path(self, retriever: SimpleRetriever, primary_key_value: str) -> str: """Construct the fetch path by appending PK to base path.""" # Lines 257-267 def _send_fetch_request(self, retriever: SimpleRetriever, fetch_path: str, ...) -> Response: """Send the HTTP request to fetch the record.""" # Lines 269-290 def _extract_record_from_response(self, response, retriever, stream_name, primary_key_value, ...) -> dict: """Extract and return the record from the HTTP response.""" # Lines 304-343This would make the code more maintainable, easier to test individual components, and reduce cognitive complexity. However, if you prefer to keep it as-is for the initial implementation, that's also reasonable.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte/_executors/declarative.py(4 hunks)airbyte/sources/base.py(2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: Suraj-Vishwakarma70
Repo: airbytehq/PyAirbyte PR: 285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.
Applied to files:
airbyte/sources/base.py
🧬 Code graph analysis (2)
airbyte/sources/base.py (3)
airbyte/_executors/declarative.py (2)
DeclarativeExecutor(92-343)fetch_record(195-343)airbyte/shared/catalog_providers.py (1)
configured_catalog(72-74)airbyte/exceptions.py (2)
AirbyteStreamNotFoundError(400-404)PyAirbyteInputError(201-210)
airbyte/_executors/declarative.py (2)
airbyte/_connector_base.py (1)
name(84-86)airbyte/exceptions.py (2)
AirbyteStreamNotFoundError(400-404)AirbyteRecordNotFoundError(416-420)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
🔇 Additional comments (2)
airbyte/sources/base.py (1)
33-33: LGTM!The import is necessary for the
get_recordimplementation and is properly placed.airbyte/_executors/declarative.py (1)
6-6: LGTM!The new imports are necessary for the
fetch_recordimplementation and are used appropriately throughout the code.Also applies to: 19-20, 22-22
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
tests/unit_tests/test_get_record.py (1)
17-90: Consider adding tests for additional DeclarativeExecutor edge cases?Looking at the implementation in
airbyte/_executors/declarative.py, a few edge cases could strengthen coverage:
- Non-SimpleRetriever case: When a stream uses a different retriever type (should raise
NotImplementedError)- Response is None: When
send_request()returnsNone(should raiseAirbyteRecordNotFoundError)- Empty records with fallback: When
select_records()returns empty butresponse.json()has data- Stream unwrapping: When streams are wrapped and need unwrapping to find the declarative stream
These would catch regressions in the error-handling paths. Given that the PR notes this hasn't been tested with real connectors yet, these additional tests might catch issues before integration testing, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tests/unit_tests/test_get_record.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/unit_tests/test_get_record.py (2)
airbyte/_executors/declarative.py (2)
DeclarativeExecutor(92-343)fetch_record(195-343)airbyte/sources/base.py (3)
_get_stream_primary_key(605-626)_normalize_and_validate_pk_value(628-675)get_record(677-713)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (6)
tests/unit_tests/test_get_record.py (6)
1-16: LGTM!The imports and module setup are clean and appropriate for the test suite.
17-90: LGTM!The stream validation test covers both the success case and the stream-not-found error path effectively. The extensive mocking is appropriate for isolating the unit under test.
92-127: LGTM!Excellent coverage of various primary key formats. The test thoroughly validates the PK extraction and normalization logic.
129-148: LGTM!Good error case coverage for nonexistent streams.
205-220: LGTM!Properly validates that
get_record()is restricted to declarative sources.
222-242: LGTM!The test effectively verifies that
get_record()correctly delegates toexecutor.fetch_record()with proper parameter passing.
- Remove _unwrap_to_declarative_stream() function entirely - Validate streams directly with isinstance(stream, AbstractStream) - Replace getattr() with direct attribute access after validation - Use guard statements instead of graceful error handling - Remove contextlib.suppress() around schema loading - Remove JSON fallback when select_records() returns empty - Update test to use Mock(spec=AbstractStream) for proper type checking This addresses review feedback to use strong typing and fail-fast error handling instead of duck-typing and graceful degradation. Co-Authored-By: AJ Steers <[email protected]>
Address review feedback to validate stream name match first, then check type compatibility. This ensures we raise NotImplementedError for found streams of incompatible types rather than silently skipping them. - Match stream by name first - Validate AbstractStream type after name match - Raise NotImplementedError with clear message for incompatible types - Remove isinstance() filter from available_streams list Co-Authored-By: AJ Steers <[email protected]>
…ack to get_record() - Refactor _get_stream_primary_key() to use CatalogProvider.get_primary_keys() - Handle both flat and nested primary key formats - Add allow_scanning and scan_timeout_seconds parameters to get_record() - Implement scanning fallback when direct fetch fails or for non-declarative sources - Normalize dict keys with LowerCaseNormalizer for case-insensitive comparison - Defer PK validation to avoid early catalog access on non-declarative executors Addresses GitHub review comments from @aaronsteers on PR #872 Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
airbyte/sources/base.py (2)
606-638: Consider simplifying by using CatalogProvider directly, wdyt?The current implementation creates a temporary catalog (lines 623-630) just to call
CatalogProvider.get_primary_keys(). Sinceself.configured_catalogalready has properly formatted primary keys, could we simplify this to:def _get_stream_primary_key(self, stream_name: str) -> list[str]: """Get the primary key for a stream.""" catalog_provider = CatalogProvider(self.configured_catalog) return catalog_provider.get_primary_keys(stream_name)This would:
- Eliminate the manual catalog iteration and temporary object creation
- Reuse the validation logic already in
CatalogProvider- Let
CatalogProvider.get_configured_stream_info()handle the stream-not-found case consistentlyThe defensive check for flat vs nested PK format (lines 620-621) shouldn't be needed since
get_configured_catalog()(lines 452-456) already ensures primary keys are in the correct nested format.
719-740: Minor optimization: pk_value_str computed twice in fallback pathWhen a
DeclarativeExecutorfalls back to scanning (lines 726-729),pk_value_stris computed twice:
- Line 720: For the direct fetch attempt
- Line 740: For the scanning fallback
You could optimize by computing once before line 719:
+ pk_value_str = self._normalize_and_validate_pk_value(stream_name, pk_value) if isinstance(self.executor, DeclarativeExecutor): - pk_value_str = self._normalize_and_validate_pk_value(stream_name, pk_value) try: return self.executor.fetch_record(Then remove line 740 since
pk_value_stris already available. This avoids redundant validation when falling back to scanning, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/sources/base.py(2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: Suraj-Vishwakarma70
Repo: airbytehq/PyAirbyte PR: 285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.
Applied to files:
airbyte/sources/base.py
🧬 Code graph analysis (1)
airbyte/sources/base.py (4)
airbyte/_executors/declarative.py (2)
DeclarativeExecutor(45-282)fetch_record(148-282)airbyte/_util/name_normalizers.py (3)
LowerCaseNormalizer(48-87)normalize(23-25)normalize(53-87)airbyte/shared/catalog_providers.py (3)
configured_catalog(72-74)CatalogProvider(31-226)get_primary_keys(146-172)airbyte/exceptions.py (3)
AirbyteStreamNotFoundError(400-404)PyAirbyteInputError(201-210)AirbyteRecordNotFoundError(416-420)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
- Add 'from typing import Any' import - Fix type annotation on line 183: pk_value: any -> pk_value: Any - Fix type annotation on line 254: pk_value: any -> pk_value: Any Addresses GitHub review comment #2547624128 Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
tests/unit_tests/test_get_record.py (4)
19-92: Stream validation test is solid; consider also asserting on the constructed fetch path?The parametrized coverage for existing vs missing streams looks good and the mocking of
declarative_source.streams/SimpleRetrieveraligns with the implementation. Would it be worth adding an assertion thatsend_requestis called with the expectedpath(including the/primary_key_valuesuffix) so regressions in path construction are caught here as well, wdyt?
94-150: _get_stream_primary_key test currently leans on CatalogProvider internalsThese tests indirectly depend on
CatalogProviderbehavior via_get_stream_primary_key, which is nice for integration coverage but could become brittle if CDK/CatalogProvider internals change. Would you consider patchingCatalogProvider.get_primary_keys(or injecting a small fake) so this test focuses purely on how Source wires primary_key shapes into the provider, while still leaving a separate integration test for the full stack, wdyt?
207-222: Non-declarative executor test could assert on the exact error messageThe test already uses
match="only supported for declarative sources", which is helpful. If the error message ever grows, would you want to tighten this a bit more (e.g., anchor with^/$or assert on a dedicated error subclass) so refactors don’t accidentally weaken the contract, or is the current level of coupling intentional, wdyt?
224-270: Executor mocks may not exercise the same isinstance path as productionIn
get_recordtests you useMock(spec=DeclarativeExecutor)as the executor. Given thatSource.get_recordcurrently branches onisinstance(self.executor, DeclarativeExecutor)(perairbyte/sources/base.py), a plainMockwon’t satisfy thatisinstancecheck, so these tests might not be hitting the same code path as a realDeclarativeExecutorinstance. Would it be safer to use a lightweightDeclarativeExecutorsubclass test double, orcreate_autospec(DeclarativeExecutor, instance=True), so theisinstancebranch is exercised and any future changes to the type check are properly covered, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
tests/unit_tests/test_get_record.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/unit_tests/test_get_record.py (4)
airbyte/_executors/declarative.py (2)
DeclarativeExecutor(45-282)fetch_record(148-282)airbyte/sources/base.py (4)
Source(69-1138)_get_stream_primary_key(606-638)_normalize_and_validate_pk_value(640-688)get_record(690-768)airbyte/exceptions.py (2)
AirbyteStreamNotFoundError(400-404)PyAirbyteInputError(201-210)airbyte/_executors/python.py (1)
VenvExecutor(27-344)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
tests/unit_tests/test_get_record.py (1)
152-205: Great coverage of PK normalization edge casesThe parametrization here does a nice job covering strings, ints, dicts, composite keys, and missing PK definitions, and the expectations match the behavior described in
_normalize_and_validate_pk_value. I don’t see any obvious gaps in the input space given the current implementation.
- Add catalog_provider property to Source class that returns CatalogProvider instance - Remove _get_stream_primary_key() helper method (lines 606-638) - Replace both usages with self.catalog_provider.get_primary_keys() - Update test to patch catalog_provider property instead of private method - Remove tests for deleted private method (test_source_get_stream_primary_key) This simplifies the code by using the existing CatalogProvider utility directly instead of maintaining duplicate primary key extraction logic. Co-Authored-By: AJ Steers <[email protected]>
|
Devin, add MCP tools for the new capabilities. Specifically "get source record" and maybe others. |
|
Thanks AJ! I'll add an MCP tool called The tool will follow the same pattern as Implementing now. |
Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte/sources/base.py (1)
711-729: Critical: Field name normalization mismatch in scanning logic.This is the bug flagged in the previous review and it's still present. Here's what's happening:
Line 712:
pk_fieldcomes fromcatalog_provider.get_primary_keys(), which returns normalized field names (e.g.,"id"for a source field"ID"), as confirmed by the relevant code snippet showingLowerCaseNormalizer.normalize()is applied.Line 716:
get_records(stream_name)uses the defaultnormalize_field_names=False, so records contain original field names from the source (e.g.,{"ID": "123", ...}).Line 729:
record_data.get(pk_field)looks up the normalized name"id"in a record that has the original field name"ID", which returnsNoneand causes the match to fail.Example that breaks:
- Source defines primary key:
"ID"(uppercase)- After catalog normalization:
pk_field = "id"(lowercase)- Record from source:
{"ID": "123", ...}(original casing preserved)- Lookup:
record_data.get("id")→None❌- Match fails even though the record exists
The fix is straightforward—pass
normalize_field_names=Trueto ensure records use normalized field names:- for record in self.get_records(stream_name): + for record in self.get_records(stream_name, normalize_field_names=True):Would you also want to add a unit test that covers a source with non-lowercase primary key field names to prevent regressions, wdyt?
🧹 Nitpick comments (2)
airbyte/sources/base.py (2)
422-425: Consider caching the catalog_provider to avoid repeated instantiation.The property creates a new
CatalogProviderinstance on each access. Looking atget_record(), it calls this property twice indirectly (via_normalize_and_validate_pk_valueat line 691 and directly at line 712), creating two instances per call.While
CatalogProviderinitialization is likely lightweight, would it make sense to cache the instance—perhaps asself._catalog_provider—and only recreate it whenconfigured_catalogchanges? Or is the instantiation cost negligible enough that this optimization isn't worth the added complexity, wdyt?
690-713: Optional: Avoid duplicate PK normalization in scanning path.The code calls
_normalize_and_validate_pk_value()twice when entering the scanning fallback:
- Line 691: First call (before attempting declarative fetch)
- Line 711: Second call (when entering scan fallback)
Since
pk_value_stris already computed at line 691, you could reuse it at line 711 instead of re-normalizing. Similarly,primary_key_fieldscould be retrieved once.Would it be worth caching these values to avoid the redundant work, or is the simplicity of the current structure preferred? Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte/mcp/local_ops.py(1 hunks)airbyte/sources/base.py(3 hunks)tests/unit_tests/test_get_record.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/unit_tests/test_get_record.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: Suraj-Vishwakarma70
Repo: airbytehq/PyAirbyte PR: 285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.
Applied to files:
airbyte/sources/base.py
🧬 Code graph analysis (2)
airbyte/mcp/local_ops.py (3)
airbyte/mcp/_tool_utils.py (1)
mcp_tool(102-148)airbyte/sources/base.py (2)
config_spec(359-368)get_record(661-739)airbyte/mcp/_util.py (1)
resolve_config(129-226)
airbyte/sources/base.py (4)
airbyte/_executors/declarative.py (1)
fetch_record(148-282)airbyte/_util/name_normalizers.py (3)
LowerCaseNormalizer(48-87)normalize(23-25)normalize(53-87)airbyte/shared/catalog_providers.py (3)
CatalogProvider(31-226)configured_catalog(72-74)get_primary_keys(146-172)airbyte/exceptions.py (2)
PyAirbyteInputError(201-210)AirbyteRecordNotFoundError(416-420)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (3)
airbyte/mcp/local_ops.py (2)
464-469: Question: Isidempotent=Trueaccurate for record fetching?The decorator marks this tool as
idempotent=True, which typically means repeated calls with the same inputs produce the same result. However, since the underlying data source could change between calls (records can be updated or deleted), the samepk_valuemight return different data or even fail on subsequent calls.Would it make more sense to omit the
idempotentflag or set it toFalse? Or is the idempotency claim intended at the "operation level" (i.e., the operation itself doesn't modify state), regardless of whether the returned data changes? Wdyt?
547-580: LGTM! Error handling pattern is appropriate.The implementation correctly wraps
source.get_record()with proper configuration resolution and error handling. The pattern of returning an error string instead of raising exceptions matches the existing MCP tool conventions in this file (e.g.,read_source_stream_records), which is appropriate for MCP tools that need graceful degradation.The stderr logging at line 568-571 is also consistent with similar tools in this file.
airbyte/sources/base.py (1)
611-659: LGTM! PK validation logic is sound.The method correctly handles the three input forms (string, int, dict) and properly validates composite PKs. The normalization logic at lines 647-648 is correct: both the user-provided key and the catalog's
pk_fieldare normalized before comparison, ensuring case-insensitive matching.The NotImplementedError for composite keys (lines 632-636) is appropriate given the PR objectives note this limitation.
feat: Add singleton record fetch and MCP tool for declarative sources
Summary
Implements
Source.get_record(stream_name, pk_value)andDeclarativeExecutor.fetch_record()to enable fetching single records by primary key from declarative (YAML-based) sources, plus a newget_source_recordMCP tool to expose this functionality to AI agents.Key features:
source.get_record(stream_name, pk_value="123")AirbyteRecordNotFoundErrorexception for missing recordsget_source_recordfor AI agent integrationImplementation approach:
ConcurrentDeclarativeSource.streams()/{pk_value}to stream's base pathcatalog_providerproperty to Source class for reusable CatalogProvider accessReview & Testing Checklist for Human
source.get_record("pokemon", pk_value="1")works end-to-endget_source_recordMCP tool works correctly with AI agents/MCP clients/{pk_value}appended to base path, which may not work for all REST API patternsretriever._request_headers()etc. are marked with noqa but could break in future CDK versionsTest Plan Recommendation
Notes
base_path/{pk_value}, may need customization for different API patternsdict(first_record.data)conversion (lines 295-297 in declarative.py) could cause runtime errors with actual record structuresRequested by: AJ Steers (@aaronsteers, [email protected])
Devin session: https://app.devin.ai/sessions/9b1bcd7c48bf4d259675a0033564d3fb
Summary by CodeRabbit
New Features
Improvements
Tests
✏️ Tip: You can customize this high-level summary in your review settings.